package com.zkyt.lib_msdk_ext.common

import com.zkyt.lib_msdk_ext.log.SDKExtLog
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable
import io.reactivex.rxkotlin.addTo
import io.reactivex.schedulers.Schedulers
import io.reactivex.subjects.BehaviorSubject
import io.reactivex.subjects.Subject
import java.lang.reflect.Method
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.TimeUnit

/**
 * Created by chenyu on 2024/2/2
 * Description:
 */
class AutoGetProcessor {
    companion object {
        private const val INTERVAL_GET = 1000L
    }
    // 暂时先不clear，防止飞机断连时，自动取消一些监听
    protected val compositeDisposable =  CompositeDisposable()
    /** 处理自动get，定时get转化为Subject中的值 */
    private val autoGetSubjects = ConcurrentHashMap<String, Pair<Subject<Optional<Any>>, Disposable?>>()

    @Synchronized
    fun <T> getAutoGetSubject(callMethod: Method?, getMethod: () -> Single<T>, interval: Long = INTERVAL_GET): Observable<Optional<T>> {
        val keyMethodName = getMethodKey(callMethod)
        if(autoGetSubjects.containsKey(keyMethodName).not()) {
            // 注意，必须使用BehaviorSubject，否则下面的"subscriberCount"方法会找不到
            val itemFirstSubject = BehaviorSubject.create< Optional<Any>>()
            val intervalJob = Observable.interval(interval, TimeUnit.MILLISECONDS)
                .flatMapSingle {
                    getMethod()
                }.doOnNext {
                    itemFirstSubject.onNext(Optional.ofNullable(it))
                }.subscribeOn(Schedulers.io())
                .doOnError {
                    SDKExtLog.d("getAutoGetSubject, doOnError, keyMethodName: $keyMethodName, \n error: ${it.stackTraceToString()}")
                }
                .retry()
                .subscribe()
                .addTo(compositeDisposable)

            val itemPair = Pair<Subject<Optional<Any>>, Disposable?>(itemFirstSubject, intervalJob)
            autoGetSubjects[keyMethodName] = itemPair
        }
        val subject = autoGetSubjects[keyMethodName]!!.first

        return subject
            .hide()
            .doOnDispose {
                handleAutoGetSubjectDispose(keyMethodName, subject)
            }
            .map { it as Optional<T> }
    }

    fun clearCache() {
        autoGetSubjects.forEach { (t, u) ->
            u.first.onNext(Optional.ofNullable(null))
        }
    }

    @Synchronized
    private fun handleAutoGetSubjectDispose(keyMethodName: String, subject: Subject<Optional<Any>>) {
        // 取消订阅
        // 由于RxJava有Bug，doOnDispose时及时没有订阅者了，hasObservers还是true，这时候observerCount为1
        var observerCount = 0
        subject.javaClass.getDeclaredMethod("subscriberCount").apply {
            isAccessible = true
            observerCount = invoke(subject) as Int
//            SDKExtLog.i("getCameraSourceObservable, invoke, count: ${observerCount}")
        }

        if(observerCount <= 1) {
            val orgDisposable = autoGetSubjects[keyMethodName]?.second
            orgDisposable?.let {
                compositeDisposable.remove(it)
            }
            autoGetSubjects.remove(keyMethodName)
        }
    }

    private fun getMethodKey(curMethod: Method?): String {
        return curMethod?.name +
                curMethod?.parameterTypes.toString() +
                curMethod?.returnType.toString()
    }

}